/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.transforms;



import com.bff.gaia.unified.sdk.PipelineRunner;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.transforms.Combine.CombineFn;

import com.bff.gaia.unified.sdk.values.PCollectionView;



/**

 * This class contains combine functions that have access to {@code PipelineOptions} and side inputs

 * through {@code CombineWithContext.Context}.

 *

 * <p>{@link CombineFnWithContext} is for users to extend.

 */

public class CombineWithContext {



  /**

   * Information accessible to all methods in {@code CombineFnWithContext} and {@code

   * KeyedCombineFnWithContext}.

   */

  public abstract static class Context {

    /**

     * Returns the {@code PipelineOptions} specified with the {@link PipelineRunner} invoking this

     * {@code KeyedCombineFn}.

     */

    public abstract PipelineOptions getPipelineOptions();



    /**

     * Returns the value of the side input for the window corresponding to the main input's window

     * in which values are being combined.

     */

    public abstract <T> T sideInput(PCollectionView<T> view);

  }



  /**

   * An internal interface for signaling that a {@code GloballyCombineFn} or a {@code

   * PerKeyCombineFn} needs to access {@code CombineWithContext.Context}.

   *

   * <p>For internal use only.

   */

  public interface RequiresContextInternal {}



  /**

   * A combine function that has access to {@code PipelineOptions} and side inputs through {@code

   * CombineWithContext.Context}.

   *

   * <p>See the equivalent {@link CombineFn} for details about combine functions.

   */

  public abstract static class CombineFnWithContext<InputT, AccumT, OutputT>

      extends CombineFnBase.AbstractGlobalCombineFn<InputT, AccumT, OutputT>

      implements RequiresContextInternal {

    /**

     * Returns a new, mutable accumulator value, representing the accumulation of zero input values.

     *

     * <p>It is equivalent to {@link CombineFn#createAccumulator}, but it has additional access to

     * {@code CombineWithContext.Context}.

     */

    public abstract AccumT createAccumulator(Context c);



    /**

     * Adds the given input value to the given accumulator, returning the new accumulator value.

     *

     * <p>It is equivalent to {@link CombineFn#addInput}, but it has additional access to {@code

     * CombineWithContext.Context}.

     */

    public abstract AccumT addInput(AccumT accumulator, InputT input, Context c);



    /**

     * Returns an accumulator representing the accumulation of all the input values accumulated in

     * the merging accumulators.

     *

     * <p>It is equivalent to {@link CombineFn#mergeAccumulators}, but it has additional access to

     * {@code CombineWithContext.Context}.

     */

    public abstract AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c);



    /**

     * Returns the output value that is the result of combining all the input values represented by

     * the given accumulator.

     *

     * <p>It is equivalent to {@link CombineFn#extractOutput}, but it has additional access to

     * {@code CombineWithContext.Context}.

     */

    public abstract OutputT extractOutput(AccumT accumulator, Context c);



    /**

     * Returns an accumulator that represents the same logical value as the input accumulator, but

     * may have a more compact representation.

     *

     * <p>It is equivalent to {@link CombineFn#compact}, but it has additional access to {@code

     * CombineWithContext.Context}.

     */

    public AccumT compact(AccumT accumulator, Context c) {

      return accumulator;

    }



    /**

     * Applies this {@code CombineFnWithContext} to a collection of input values to produce a

     * combined output value.

     */

    public OutputT apply(Iterable<? extends InputT> inputs, Context c) {

      AccumT accum = createAccumulator(c);

      for (InputT input : inputs) {

        accum = addInput(accum, input, c);

      }

      return extractOutput(accum, c);

    }



    @Override

    public OutputT defaultValue() {

      throw new UnsupportedOperationException(

          "Override this function to provide the default value.");

    }

  }

}